package com.ideal.hadoopadmin.crontab.hive;


import com.ideal.hadoopadmin.crontab.db.ConnectionManager;
import com.ideal.hadoopadmin.crontab.tool.Tools;
import com.ideal.tools.ssh.common.CommonProperties;
import com.ideal.tools.ssh.context.ClusterContext;
import com.ideal.tools.ssh.entity.LinuxMachine;

import java.sql.Connection;
import java.util.*;

public class FlushHiveInfo {

    /**
     *刷新hive 表信息
     1、查询tbls  148
     2、对"NAME" -> "tmpdb"和"LOCATION" -> "hdfs://ns3/user/hive/warehouse/tmpdb.db/tmp_t1" 去重
     3、在cluster_user里根据去重的Name查询userName,id
     4、在meta_hdfs_info_bak里根据LOCATION查询hdfsPath,id
     5、替换NAME和LOCATION将结果插入meta_hive_info
     */
    public void initHiveMetadataNew(ClusterContext context) {
        //读取hive数据
        String preHdfs = context.getCommonProperties().getProperty(CommonProperties.HIVE_HDFSPATH_PREFIX,"");
        List<Map<String, Object>> metaDataList = loadHiveMetaStore(preHdfs);
        //生成插入数据
        List<String> sqlListNew = createSQLNew(metaDataList);
        //清空删除  meta_hive_sql meta_hive_info
        delHiveInfo();
        //插入数据
        Tools.exeSQLBatch(sqlListNew);
    }

    /**
     * 根据db名称获取 表
     * @param dbName
     * @return
     */
    public static String[] getHiveInfoTableByDB(String dbName){
        String[] tables=null;
        String sql="select tablename from meta_hive_info where dbName ='"+dbName+"'";
        Connection connUserName = ConnectionManager.getConnection();

        List<Map<String,Object>> rs=ConnectionManager.queryDB(connUserName, sql);
        if(rs!=null&&rs.size()>0){
            tables=new String[rs.size()];
            int count=0;
            for(Map<String,Object> oneLine:rs){
                tables[count]=oneLine.get("TABLENAME").toString();
                count++;
            }
        }
        return tables;
    }

    //清空删除  meta_hive_sql meta_hive_info
    private void delHiveInfo() {
        List<String> sqlList = new ArrayList<String>();
        sqlList.add("truncate table meta_hive_info");
        sqlList.add("truncate table meta_hive_sql");
        Tools.exeSQLBatch(sqlList);
    }

    //载入 hive 元数据 基本信息
    private List<Map<String, Object>> loadHiveMetaStore(String preHdfs) {
        int count;
        StringBuffer hql = new StringBuffer();
        LoadAllHiveMeta loadAllHiveMeta = new LoadAllHiveMeta(hql).invoke();
        List<Map<String, Object>> rsList = loadAllHiveMeta.getRsList();
        HashSet nameset = loadAllHiveMeta.getNameset(), hdfsPathset = loadAllHiveMeta.getHdfsPathset();
        StringBuffer names = loadAllHiveMeta.getNames(), hdfsPaths = loadAllHiveMeta.getHdfsPaths();
        //去重
        loadHiveMetaDistinct(rsList, nameset, hdfsPathset,preHdfs);
        //拼接批量查询sql(userName)
        List<Map<String, Object>> dbNameIdRes = loadUsernameSql(hql, nameset, names);
        //拼接批量查询sql(hdfsPath)
        List<Map<String, Object>> hdfsIdRes = loadHdfsPathSql(hql, hdfsPathset, hdfsPaths);
        //替换userName->userId;hdfsPath->hdfsId
        loadReplaceId(rsList, dbNameIdRes, hdfsIdRes,preHdfs);

        return rsList;
    }

    //生成插入数据
    public List<String> createSQLNew(List<Map<String, Object>> metaDataList) {
        List<String> sql = new ArrayList<String>();
        StringBuffer sqlTem = new StringBuffer();
        for (Map<String, Object> metaData : metaDataList) {
            sqlTem.setLength(0);
            try {
                Integer.parseInt(metaData.get("NAME").toString());
                Integer.parseInt(metaData.get("LOCATION").toString());
            } catch (NumberFormatException e) {
                continue;
            }
            Tools.getCreateSqlParam(sqlTem,"insert into meta_hive_info (dbName,tableName,hdfsInfoBakId"
                        ," ,createTime,clusterUserId)values ('" + metaData.get("DBNAME").toString() + ""
                        ," ','" + metaData.get("TBL_NAME").toString()+" ','" + Integer.parseInt(metaData.get("LOCATION").toString())
                        ,"','" + metaData.get("CREATE_TIME").toString() + "','" + Integer.parseInt(metaData.get("NAME").toString()) + "')");
            sql.add(sqlTem.toString());
        }

        return sql;
    }

    //去重
    private void loadHiveMetaDistinct(List<Map<String, Object>> rsList, HashSet nameset, HashSet hdfsPathset, String preHdfs) {
        for (Map<String, Object> cols : rsList) {
            nameset.add(cols.get("NAME").toString());
            hdfsPathset.add(cols.get("LOCATION").toString().split(preHdfs)[1]);
        }
    }

    //拼接批量查询sql(userName)
    private List<Map<String, Object>> loadUsernameSql(StringBuffer hql, HashSet nameset, StringBuffer names) {
        Iterator<String> iteraName = nameset.iterator();
        int count = 1;
        while (iteraName.hasNext()) {
            String tem = iteraName.next();
            names.append(count == nameset.size() ? "'" + tem + "'" : "'" + tem + "',");
            count++;
        }
        hql.append("select userName,id from cluster_user where  userName in(" + names.toString() + ")");
        Connection connUserName = ConnectionManager.getConnection();

        return ConnectionManager.queryDB(connUserName, hql.toString());
    }

    //拼接批量查询sql(hdfsPath)
    private List<Map<String, Object>> loadHdfsPathSql(StringBuffer hql, HashSet hdfsPathset, StringBuffer hdfsPaths) {
        int count;
        Iterator<String> iteraHdfs = hdfsPathset.iterator();
        count = 1;
        hql.setLength(0);
        while (iteraHdfs.hasNext()) {
            String tem = iteraHdfs.next();
            hdfsPaths.append(count == hdfsPathset.size() ? "'" +  tem + "'" : "'" +  tem + "',");
            count++;
        }
        hql.append("select hdfsPath,id from meta_hdfs_info_bak where  hdfsPath in(" + hdfsPaths.toString() + ")");
        Connection connHdfs = ConnectionManager.getConnection();

        return ConnectionManager.queryDB(connHdfs, hql.toString());
    }

    //替换userName->userId;hdfsPath->hdfsId
    private void loadReplaceId(List<Map<String, Object>> rsList, List<Map<String, Object>> dbNameIdRes, List<Map<String, Object>> hdfsIdRes,String preHdfs) {
        for (Map<String, Object> cols : rsList) {
            cols.put("DBNAME", cols.get("NAME").toString());
            //userName替换成UserNameId
            cols.get("NAME").toString();
            for (Map<String, Object> colname : dbNameIdRes) {
                if ((cols.get("NAME").toString()).equals(colname.get("USERNAME").toString())) {
                    cols.put("NAME", colname.get("ID").toString());
                }
            }
            //hdfsPath替换成hdfsPathId
            for (Map<String, Object> colhdfs : hdfsIdRes) {
                if ((cols.get("LOCATION").toString()).equals(preHdfs+colhdfs.get("HDFSPATH").toString())) {
                    cols.put("LOCATION", colhdfs.get("ID").toString());
                }
            }

        }
    }

    //根据meta_hive_info的id更新或增加meta_hive_sql的hiveSql
    public void createDll(int hdsfId) {
        //根据id查dbName和tableName
        int count = 1;
        String dbName = "", tbName = "", tbl_id = "", serde_id = "", sd_id = "", cd_id = "", location = "", input_format = "", output_format = "";
        StringBuffer hql = new StringBuffer(), resHql = new StringBuffer(), tem = new StringBuffer();
        //根据id查询hadoopadmin_mybaties的meta_hive_info
        GetQueryAdmin getQueryAdmin = new GetQueryAdmin(hdsfId, dbName, tbName, hql).invoke();
        dbName = getQueryAdmin.getDbName();
        tbName = getQueryAdmin.getTbName();
        //查询148hivemeta
        GetQueryhiveMeta getQueryhiveMeta = new GetQueryhiveMeta(dbName, tbName, tbl_id, serde_id, cd_id, location, input_format, output_format, hql).invoke();
        tbl_id = getQueryhiveMeta.getTbl_id();
        cd_id = getQueryhiveMeta.getCd_id();
        serde_id = getQueryhiveMeta.getSerde_id();
        location = getQueryhiveMeta.getLocation();
        input_format = getQueryhiveMeta.getInput_format();
        output_format = getQueryhiveMeta.getOutput_format();
        //查询table
        List<Map<String, Object>> resTab = getQueryTable(tbName, tbl_id, hql, resHql);
        //列cols
        getQueryCols(cd_id, hql, resHql, tem);
        //分区
        getQueryPartition(tbl_id, hql, resHql, tem);
        //加载 table 参数
        getQuerySerde(serde_id, hql, resHql, tem);
        //input ouput location
        getQueryInOuLocation(location, input_format, output_format, resHql);
        //table params
        getQueryTblpro(resHql, tem, resTab);
//        System.out.println(resHql.toString());
        //insertOrupdate hadoopadmin_mybaties的meta_hive_sql
        insertOrUpdateHiveSql(hdsfId, hql, resHql);
    }

    //查询table
    private List<Map<String, Object>> getQueryTable(String tbName, String tbl_id, StringBuffer hql, StringBuffer resHql) {
        Tools.getCreateSqlParam(hql, " select param_key,param_value from table_params where ", " tbl_id='" + tbl_id + "'");
        Connection connTable = ConnectionManager.getHiveConnection();
        List<Map<String, Object>> resTab = ConnectionManager.queryDB(connTable, hql.toString());
        String tableType = "";
        for (Map<String, Object> tab : resTab) {
            if (null != tab.get("PARAM_KEY") && tab.get("PARAM_KEY").toString().contains("EXTERNAL")) {
                tableType = "EXTERNAL ";
            }
        }
        resHql.append("CREATE " + tableType + "TABLE '" + tbName.trim() + "'");

        return resTab;
    }

    //查询列cols
    private void getQueryCols(String cd_id, StringBuffer hql, StringBuffer resHql, StringBuffer tem) {
        int count;
        Tools.getCreateSqlParam(hql, " select column_name,type_name from columns_v2", " where cd_id='" + cd_id + "' ");
        Connection connCols = ConnectionManager.getHiveConnection();
        List<Map<String, Object>> resCols = ConnectionManager.queryDB(connCols, hql.toString());
        if (!resCols.isEmpty()) {
            count = getCount(tem);
            for (Map<String, Object> cols : resCols) {
                tem.append(resCols.size() != count ? "'" + cols.get("COLUMN_NAME") + "' " + cols.get("TYPE_NAME") + "," : "'" + cols.get("COLUMN_NAME") + "' " + cols.get("TYPE_NAME"));
                count++;
            }
            resHql.append(" (").append(tem).append(")\n");
        }
    }

    //分区
    private void getQueryPartition(String tbl_id, StringBuffer hql, StringBuffer resHql, StringBuffer tem) {
        int count;
        Tools.getCreateSqlParam(hql, " select pkey_name,pkey_type from partition_keys", " where tbl_id='" + tbl_id + "'");
        Connection connPar = ConnectionManager.getHiveConnection();
        List<Map<String, Object>> resPar = ConnectionManager.queryDB(connPar, hql.toString());
        if (!resPar.isEmpty()) {
            count = 1;
            tem.setLength(0);
            for (Map<String, Object> pars : resPar) {
                tem.append(resPar.size() != count ? "'" + pars.get("PKEY_NAME") + "' " + pars.get("PKEY_TYPE") + "," : "'" + pars.get("PKEY_NAME") + "' " + pars.get("PKEY_TYPE"));
                count++;
            }
            resHql.append("PARTITIONED BY (").append(tem).append(")\n");
        }
    }

    //加载 table 参数
    private void getQuerySerde(String serde_id, StringBuffer hql, StringBuffer resHql, StringBuffer tem) {
        int count;
        Tools.getCreateSqlParam(hql, " select param_key,param_value from serde_params", " where serde_id='" + serde_id + "'  order by serde_id");
        Connection connSer = ConnectionManager.getHiveConnection();
        List<Map<String, Object>> resSer = ConnectionManager.queryDB(connSer, hql.toString());
        if (!resSer.isEmpty()) {
            count = 1;
            tem.setLength(0);
            for (Map<String, Object> sers : resSer) {
                tem.append(resSer.size() != count ? convertDelimit(sers.get("PARAM_KEY").toString()) + " " + convertString(sers.get("PARAM_VALUE").toString()) + "," + "\n" : convertDelimit(sers.get("PARAM_KEY").toString()) + " " + convertString(sers.get("PARAM_VALUE").toString()) + "\n");
                count++;
            }
            resHql.append("ROW FORMAT DELIMITED").append("\n").append(tem);
        }
    }

    //处理input ouput location
    private void getQueryInOuLocation(String location, String input_format, String output_format, StringBuffer resHql) {
        if (!"".equals(input_format) || !"".equals(output_format) || !"".equals(location)) {
            resHql.append("STORED AS ");
        }
        if (!"".equals(input_format)) {
            resHql.append("INPUTFORMAT ").append(input_format).append("\n");
        }
        if (!"".equals(output_format)) {
            resHql.append("OUTPUT_FORMAT ").append(output_format).append("\n");
        }
        if (!"".equals(location)) {
            resHql.append("LOCATION ").append(location).append("\n");
        }
    }

    //处理table params
    private void getQueryTblpro(StringBuffer resHql, StringBuffer tem, List<Map<String, Object>> resTab) {
        int count;
        if (!resTab.isEmpty()) {
            count = 1;
            tem.setLength(0);
            for (Map<String, Object> tabs : resTab) {
                tem.append(resTab.size() != count ? "'" + tabs.get("PARAM_KEY") + "' " + tabs.get("PARAM_VALUE") + "," : "'" + tabs.get("PARAM_KEY") + "' " + tabs.get("PARAM_VALUE"));
                count++;
            }
            resHql.append("TBLPROPERTIES (").append(tem).append(")");
        }
    }

    ///insert或update hadoopadmin_mybaties的meta_hive_sql
    private void insertOrUpdateHiveSql(int hdsfId, StringBuffer hql, StringBuffer resHql) {
        Tools.getCreateSqlParam(hql, " SELECT hiveSql from meta_hive_sql", " where hiveInfoId='" + hdsfId + "'");
        Connection connMetaHiveSql = ConnectionManager.getConnection();
        List<Map<String, Object>> resMetaHiveSql = ConnectionManager.queryDB(connMetaHiveSql, hql.toString());
        StringBuffer hiveSql = new StringBuffer();
        if (resMetaHiveSql.isEmpty()) {
            Tools.getCreateSqlParam(hql, "insert into meta_hive_sql (hiveSql,hiveInfoId) values('" + resHql.toString().replace("'", "\\'") + "','" + hdsfId + "')");
        } else {
            Tools.getCreateSqlParam(hql, "update meta_hive_sql set hiveSql='" + resHql.toString().replace("'", "\\'") + "' where  hiveInfoId='" + hdsfId + "'");
        }
        List<String> sqlList = new ArrayList<String>();
        sqlList.add(hql.toString());
        Tools.exeSQLBatch(sqlList);
    }

    //count stringBuffer初始化
    private int getCount(StringBuffer tem) {
        int count;
        count = 1;
        tem.setLength(0);
        return count;
    }

    //serde_params的PARAM_KEY处理
    private String convertDelimit(String val) {
        String rs = "";
        if (val.startsWith("field")) {
            rs = "FIELDS TERMINATED BY ";
        } else if (val.startsWith("line")) {
            rs = "LINES TERMINATED BY ";
        } else if (val.startsWith("serialization")) {
            rs = "SERIALIZATION TERMINATED BY ";
        }
        return rs;
    }

    //转换字符
    private String convertString(String val) {
        String rs = "";
        /** ascii 转换 */
        int valLen = val.length();
        char c = 'a';
        for (int v = 0; v < valLen; v++) {
            c = val.charAt(v);
            if (c == 9)
                rs = "'\\t'";
            else if (c == 10)
                rs = "'\\n'";
            else
                rs = "'" + val + "'";
        }
        return rs;
    }

    public void call(){
//        //构造测试ClusterContext  测试用
//        Map<String, String> propertyMap = new HashMap<String, String>();
//        propertyMap.put(CommonProperties.WEBAPP_INIT_PATH, com.ideal.hadoopadmin.crontab.property.Properties.instance().getPropertyByKey(CommonProperties.WEBAPP_INIT_PATH, ""));
//        propertyMap.put(CommonProperties.HIVE_HDFSPATH_PREFIX, com.ideal.hadoopadmin.crontab.property.Properties.instance().getPropertyByKey(CommonProperties.HIVE_HDFSPATH_PRIFIX, "")); //hdfs前缀
//
//        CommonProperties commonProperties = new CommonProperties(propertyMap);
//        ClusterContext context = new ClusterContext(commonProperties);
//        //构造机器参数
//        List<LinuxMachine> machineList = new ArrayList<LinuxMachine>();
//        LinuxMachine nn1 = new LinuxMachine().initIP("10.5.24.151").initLoginName("I-Hadoop")
//                .initPassWord("ideal123").initMachineType(LinuxMachine.MachineType.WebAPP);
//        machineList.add(nn1);
//        context.setOriginalList(machineList);
        //构造结束  测试用
        //刷新meta_hive_info
        initHiveMetadataNew(Tools.getDemoClusterContextDemo());
    }

    public void callSql(int hdsfId){
        //根据meta_hive_info的id更新或增加meta_hive_sql的hiveSql
        createDll(hdsfId);
    }

    //Demo 刷新hive 表信息
    public static void main(String[] args) {
        FlushHiveInfo hive = new FlushHiveInfo();
        //刷新hive
        hive.initHiveMetadataNew(Tools.getDemoClusterContextDemo());
        //根据meta_hive_info的id更新或增加meta_hive_sql的hiveSql
        hive.createDll(3);
    }

}
